"""
Case Type   : Compress
Case Name   : 不同会话并发执行LZ_COMPRESS_CLOSE执行成功
Create At   : 2024/11/12
Owner       : xiaqi14
Description :
              1.使用兼容A库创建压缩插件
              drop database if exists test_dcompress_01;
              create database if not exists test_dcompress_01 db_compatibility = 'A';
              create extension gms_compress;
              2.不同会话并发执行LZ_COMPRESS_CLOSE
              DECLARE
                content BLOB;
                handle int;
              BEGIN 
                content := '1234';
                handle := GMS_COMPRESS.LZ_COMPRESS_OPEN(content);
                GMS_COMPRESS.LZ_COMPRESS_CLOSE(handle);
              END;
              / 
              3.清理环境
Expect      :
              1.成功
              2.成功
              3.成功
History     :
"""

import unittest

from testcase.utils.CommonSH import CommonSH
from testcase.utils.Constant import Constant
from testcase.utils.Logger import Logger
from testcase.utils.ComThread import ComThread
from yat.test import Node
from yat.test import macro



class Timescaledb(unittest.TestCase):

    def setUp(self):
        self.log = Logger()
        self.constant = Constant()
        self.log.info('==Opengauss_Function_Compress_Case0025开始==')
        self.Constant = Constant()
        self.primary_sh = CommonSH('PrimaryDbUser')
        self.db_user_node = Node(node='PrimaryDbUser')
        self.db_name = "test_compress_25"
        status = self.primary_sh.get_db_cluster_status()
        self.assertTrue("Normal" in status or "Degraded" in status)

    def test_add_dimension(self):
        text = '----step1: 使用兼容A库创建压缩插件 expect: 成功----'
        self.log.info(text)
        self.log.info("检验创建兼容A库功能是否正常")
        drop_database_sql = "drop database if exists " + self.db_name + ";"
        drop_database_result = self.primary_sh.execut_db_sql(drop_database_sql)
        self.assertIn(self.Constant.DROP_DATABASE_SUCCESS, drop_database_result, "执行失败")
        create_database_sql = "create database " + self.db_name + " dbcompatibility = 'A';"
        create_database_result = self.primary_sh.execut_db_sql(create_database_sql)
        self.assertIn(self.Constant.CREATE_DATABASE_SUCCESS, create_database_result, "执行失败")
        show_compatibility_sql = "show sql_compatibility;"
        show_compatibility_result = self.primary_sh.execut_db_sql(show_compatibility_sql, dbname=self.db_name)
        self.assertIn('A', show_compatibility_result, "执行失败")
        self.log.info("使用兼容A库创建压缩插件")
        drop_extension_sql = "drop extension if exists gms_compress;"
        drop_extension_result = self.primary_sh.execut_db_sql(drop_extension_sql, dbname=self.db_name)
        self.assertIn(self.Constant.drop_extension_success, drop_extension_result)
        create_extension_sql = "create extension gms_compress;"
        create_extension_result = self.primary_sh.execut_db_sql(create_extension_sql, dbname=self.db_name)
        self.assertIn(self.Constant.extension_create_success, create_extension_result, "执行失败")

        text = '----step2: 不同会话并发执行LZ_COMPRESS_CLOSE expect:成功----'
        self.log.info(text)
        threads = []
        for i in range(0, 5):
            test_sql = "DECLARE " \
                       + "content BLOB; " \
                       + "handle int; " \
                       + "BEGIN " \
                       + "content := '1234'; " \
                       + "handle := GMS_COMPRESS.LZ_COMPRESS_OPEN(content); " \
                       + "GMS_COMPRESS.LZ_COMPRESS_CLOSE(handle); " \
                       + "END;"
            order = "source " + macro.DB_ENV_PATH + "; gsql -d " + self.db_name + " -p " + \
                    self.db_user_node.db_port + " -c \"" + test_sql + "\""
            self.log.info(order)
            session1 = ComThread(self.db_user_node.sh,
                                 args=(order,))
            threads.append(session1)
        thread_results = []
        self.log.info("生成5个线程")
        for t in threads:
            t.setDaemon(True)
            t.start()
        for t in threads:
            t.join(5)
            self.log.info(t.get_result().result())
            thread_results.append(t.get_result().result())
        self.log.info(thread_results)
        thread_results1 = str(thread_results)
        self.assertNotIn('ERROR', thread_results1, '执行失败')
        self.assertIn('ANONYMOUS BLOCK EXECUTE', thread_results1, '执行失败')

    def tearDown(self):
        text = '----step3: 清理环境 expect: 成功----'
        self.log.info(text)
        drop_extension_sql = "drop extension if exists gms_compress;"
        drop_extension_result = self.primary_sh.execut_db_sql(drop_extension_sql, dbname=self.db_name)
        self.assertIn(self.Constant.drop_extension_success, drop_extension_result)
        drop_database_sql = "drop database if exists " + self.db_name + ";"
        drop_database_result = self.primary_sh.execut_db_sql(drop_database_sql)
        self.assertIn(self.Constant.DROP_DATABASE_SUCCESS, drop_database_result, "执行失败")
        self.log.info('==Opengauss_Function_Compress_Case0025完成==')
